import geopandas as gpd
import plotly.express as px
from plotly.subplots import make_subplots
import pyproj
from datetime import datetime, timedelta
import happybase as hb
import pandas as pd
import io
from fiona.collection import BytesCollection
import plotly.graph_objects as go
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("putting_together").enableHiveSupport().getOrCreate()
23/01/08 12:43:32 WARN Utils: Your hostname, node1 resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3) 23/01/08 12:43:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 23/01/08 12:43:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
(z serving layer, dane w Hive)
df = spark.sql("select * from aqicn_readings;")
df = df.dropDuplicates()
df_all = df.toPandas()
yesterday = datetime.now() - timedelta(days=1)
df = df.filter(df.datetime > yesterday).toPandas()
/home/vagrant/project/.venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:178: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead series = series.astype(t, copy=False) /home/vagrant/project/.venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:178: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead series = series.astype(t, copy=False)
df_all.head()
| station_id | station_name | latitude | longitude | location | datetime | aqi | pm10 | pm25 | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 365221 | Rudzka | 52.278000 | 20.972000 | Rudzka, Marymont-Kaskada, Bielany, Warsaw, Mas... | 2023-01-04 16:30:48 | 40 | 17 | 40 |
| 1 | 93862 | Jana Piekałkiewicza | 52.193804 | 21.051250 | Jana Piekałkiewicza, Marcelin, Mokotów, Warsaw... | 2023-01-04 16:00:45 | 57 | 19 | 57 |
| 2 | 76228 | Graniczna | 52.238812 | 21.002685 | Graniczna, IV, Śródmieście, Warsaw, Masovian V... | 2023-01-04 15:00:35 | 0 | 0 | 0 |
| 3 | 81664 | Motycka | 52.275087 | 21.044866 | Motycka 23, 23, Motycka, Targówek Mieszkaniowy... | 2023-01-04 15:16:46 | 57 | 29 | 57 |
| 4 | 358753 | Koszycka | 52.242000 | 20.944000 | Koszycka, Koło-Górczewska, Koło, Wola, Warsaw,... | 2023-01-04 16:17:59 | 26 | 9 | 26 |
df.head()
| station_id | station_name | latitude | longitude | location | datetime | aqi | pm10 | pm25 | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 112837 | Pułkownika Mieczysława Niedzielskiego "Żywiciela" | 52.268000 | 20.944000 | Pułkownika Mieczysława Niedzielskiego "Żywicie... | 2023-01-08 04:28:04 | 59 | 23 | 59 |
| 1 | 77956 | Jerzego Beniamina Flatta | 52.160000 | 21.066000 | Jerzego Beniamina Flatta, Wilanów Wysoki, Wila... | 2023-01-08 06:34:12 | 31 | 14 | 31 |
| 2 | 191677 | Encyklopedyczna | 52.308000 | 20.924000 | Encyklopedyczna, Młociny, Bielany, Warsaw, Mas... | 2023-01-08 08:42:37 | 31 | 14 | 31 |
| 3 | 103360 | Bergamotki | 52.177887 | 21.041421 | Bergamotki, Green Mokotów, Mokotów, Warsaw, Ma... | 2023-01-08 11:41:02 | 54 | 19 | 54 |
| 4 | 238336 | Głębocka | 52.310000 | 21.066000 | Głębocka, Lewandów, Grodzisk, Białołęka, Warsa... | 2023-01-07 20:27:36 | 61 | 15 | 61 |
df_all.shape, df.shape
((7296, 9), (2674, 9))
(z serving layer, dane w HBase)
con = hb.Connection("localhost")
con.open()
table = con.table("region_borders")
# do odpowiedniego wczytania shapefile .shp konieczne są wszystkie pliki pomocnicze
# - wszystkie są w jednej rodzinie kolumn
for ext in ['shp', 'shx', 'dbf', 'prj', 'cpg', 'qpj']:
binary = table.row(b"dzielnice_Warszawy")[f"map_files:{ext}".encode()]
with open(f"temp/temp.{ext}", "wb") as f:
f.write(binary)
data = gpd.read_file("temp/temp.shp")
data = data.to_crs(pyproj.CRS.from_epsg(4326))
data = data.set_index('nazwa_dzie')
data
| geometry | |
|---|---|
| nazwa_dzie | |
| Żoliborz | POLYGON ((20.95755 52.26693, 20.95760 52.26712... |
| Praga-Południe | POLYGON ((21.10347 52.22220, 21.10326 52.22206... |
| Mokotów | POLYGON ((21.02160 52.21305, 21.02172 52.21307... |
| Wola | POLYGON ((20.98121 52.25855, 20.98124 52.25838... |
| Wilanów | POLYGON ((21.10500 52.19428, 21.10509 52.19404... |
| Wesoła | POLYGON ((21.19234 52.23658, 21.18920 52.23786... |
| Wawer | POLYGON ((21.10801 52.18913, 21.10798 52.18918... |
| Włochy | POLYGON ((20.92000 52.21420, 20.92019 52.21408... |
| Ursynów | POLYGON ((20.98653 52.16231, 20.98702 52.16252... |
| Śródmieście | POLYGON ((21.06187 52.21720, 21.06172 52.21703... |
| Praga-Północ | POLYGON ((21.00546 52.26812, 21.00542 52.26826... |
| Ursus | POLYGON ((20.87858 52.20955, 20.87822 52.20932... |
| Targówek | POLYGON ((21.06202 52.30838, 21.06205 52.30830... |
| Rembertów | POLYGON ((21.14226 52.27855, 21.14906 52.27935... |
| Ochota | POLYGON ((21.00181 52.22771, 21.00197 52.22747... |
| Bielany | POLYGON ((20.92511 52.32894, 20.92511 52.32846... |
| Białołęka | POLYGON ((21.01270 52.29430, 21.01267 52.29426... |
| Bemowo | POLYGON ((20.88915 52.28046, 20.88917 52.28044... |
def extract_disctrict(location):
for d in data.index:
if d in location:
return d
df_unique = df.groupby(['station_id']).agg({"datetime":"max"}).reset_index().merge(df, on=['station_id', 'datetime'], how='left')
df_unique['district'] = df_unique.location.apply(extract_disctrict)
df_all['district'] = df_all.location.apply(extract_disctrict)
data["label"] = "Warszawa"
df_unique['location_short'] = df_unique.apply(lambda row: row['location'].split(f', {row.district}')[0], axis=1)
# background mapowy
fig = px.choropleth(data.reset_index(), locations="nazwa_dzie", geojson=data.geometry, color="label",
color_discrete_map={"Warszawa":"lightgrey"},
hover_name="nazwa_dzie", hover_data={"label": False, "nazwa_dzie": False})
fig.update_layout(height=800)
fig.update_geos(fitbounds='locations', projection_type='mercator', landcolor = 'rgb(255, 255, 255)')
fig2 = px.scatter_geo(df_unique, lat='latitude', lon='longitude', color='aqi', projection='mercator',
hover_name='station_name',
hover_data={'aqi': True, 'location_short': True, 'district': True,
'latitude': False, 'longitude': False})
fig2.update_geos(fitbounds='locations')
fig3 = fig.add_trace(fig2.data[0])
fig3.update_layout(coloraxis_colorbar={"len":0.5, "title": "AQI"})
fig3.show()
mean_values = df_unique.groupby(['district'])['aqi', 'pm10', 'pm25'].mean()
/tmp/ipykernel_26587/321958684.py:1: FutureWarning: Indexing with multiple keys (implicitly converted to a tuple of keys) will be deprecated, use a list instead.
data_with_means = data.merge(mean_values, right_index=True, left_index=True)
data_with_means.reset_index()
| index | geometry | label | aqi | pm10 | pm25 | |
|---|---|---|---|---|---|---|
| 0 | Żoliborz | POLYGON ((20.95755 52.26693, 20.95760 52.26712... | Warszawa | 51.000000 | 18.000000 | 51.000000 |
| 1 | Praga-Południe | POLYGON ((21.10347 52.22220, 21.10326 52.22206... | Warszawa | 81.000000 | 26.000000 | 81.000000 |
| 2 | Mokotów | POLYGON ((21.02160 52.21305, 21.02172 52.21307... | Warszawa | 57.875000 | 22.000000 | 57.875000 |
| 3 | Wola | POLYGON ((20.98121 52.25855, 20.98124 52.25838... | Warszawa | 45.500000 | 20.000000 | 45.500000 |
| 4 | Wilanów | POLYGON ((21.10500 52.19428, 21.10509 52.19404... | Warszawa | 52.000000 | 24.000000 | 52.000000 |
| 5 | Wesoła | POLYGON ((21.19234 52.23658, 21.18920 52.23786... | Warszawa | 56.000000 | 31.000000 | 56.000000 |
| 6 | Wawer | POLYGON ((21.10801 52.18913, 21.10798 52.18918... | Warszawa | 58.833333 | 23.833333 | 58.833333 |
| 7 | Włochy | POLYGON ((20.92000 52.21420, 20.92019 52.21408... | Warszawa | 52.000000 | 25.500000 | 52.000000 |
| 8 | Ursynów | POLYGON ((20.98653 52.16231, 20.98702 52.16252... | Warszawa | 46.333333 | 16.000000 | 46.333333 |
| 9 | Śródmieście | POLYGON ((21.06187 52.21720, 21.06172 52.21703... | Warszawa | 50.500000 | 21.500000 | 50.500000 |
| 10 | Praga-Północ | POLYGON ((21.00546 52.26812, 21.00542 52.26826... | Warszawa | 55.000000 | 27.000000 | 55.000000 |
| 11 | Ursus | POLYGON ((20.87858 52.20955, 20.87822 52.20932... | Warszawa | 53.000000 | 20.000000 | 53.000000 |
| 12 | Targówek | POLYGON ((21.06202 52.30838, 21.06205 52.30830... | Warszawa | 61.000000 | 27.500000 | 61.000000 |
| 13 | Rembertów | POLYGON ((21.14226 52.27855, 21.14906 52.27935... | Warszawa | 25.000000 | 9.000000 | 25.000000 |
| 14 | Ochota | POLYGON ((21.00181 52.22771, 21.00197 52.22747... | Warszawa | 69.000000 | 27.000000 | 69.000000 |
| 15 | Bielany | POLYGON ((20.92511 52.32894, 20.92511 52.32846... | Warszawa | 52.750000 | 21.500000 | 52.750000 |
| 16 | Białołęka | POLYGON ((21.01270 52.29430, 21.01267 52.29426... | Warszawa | 92.166667 | 45.166667 | 92.166667 |
| 17 | Bemowo | POLYGON ((20.88915 52.28046, 20.88917 52.28044... | Warszawa | 59.800000 | 28.800000 | 59.800000 |
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.aqi,
hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean AQI values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "AQI"})
fig.show()
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.pm25,
hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean PM2.5 values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "PM2.5"})
fig.show()
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.pm10,
hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean PM10 values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "PM10"})
fig.show()
fig = px.line(df.sort_values('datetime'), x='datetime', y='aqi', color='station_name')
fig.show()
df_all = pd.concat([df_all, df_all.loc[df_all.groupby('station_id')["datetime"].idxmax().to_list()]])
fig = go.Figure()
fig.add_trace(
go.Violin(
x=df_all["district"],
y=df_all["aqi"],
text=df_all["station_name"],
points="all",
box_visible=True,
meanline_visible=True,
pointpos=1.5,
jitter=0.7,
selectedpoints=df_all.groupby('station_id')["datetime"].idxmax().to_list(),
selected={"marker_color": "#371ea3", "marker_opacity": 1.0},
unselected={"marker_opacity": 0.1},
hovertemplate="<b>%{text}</b><br>AQI: %{y}<extra>%{x}</extra>",
hoveron="points",
))
fig.update_layout(yaxis_title="AQI")
fig.show()